#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2015 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import packagelib
import storagelib
import testlib


@testlib.nondestructive
class TestStorageNfs(storagelib.StorageCase):

    def testNfsClient(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        m.execute(f"mkdir {self.mnt_dir}/foo {self.mnt_dir}/bar {self.mnt_dir}/test")
        self.write_file("/etc/exports", f"{self.mnt_dir}/foo 127.0.0.0/24(rw)\n{self.mnt_dir}/bar 127.0.0.0/24(rw)\n",
                        post_restore_action="systemctl restart nfs-server")
        m.execute("systemctl restart nfs-server")
        # Removing the nfs mount removes the target dir, if the test fails it
        # won't. It's important to umount before restoring /etc/exports as
        # otherwise nfs is confused and we can't umount the share.
        self.addCleanup(m.execute, f"if [ -e {self.mnt_dir}/test ]; then rm -r {self.mnt_dir}/test; fi")
        self.addCleanupMount(f"{self.mnt_dir}/test")

        orig_fstab = m.execute("cat /etc/fstab")

        # Add /foo
        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        self.dialog_set_val("server", "127.0.0.1")
        self.dialog_set_val("remote", f"{self.mnt_dir}/foo")
        self.dialog_set_val("dir", f"{self.mnt_dir}/test")
        self.dialog_apply()
        self.dialog_wait_close()

        b.wait_visible(self.card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/foo"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}/test"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}/test") + " .usage-bar[role=progressbar]")

        # Should be saved to fstab
        self.assertEqual(m.execute("cat /etc/fstab"), orig_fstab +
                         f"127.0.0.1:{self.mnt_dir}/foo {self.mnt_dir}/test nfs defaults\n")

        # Try to add some non-exported directory
        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        self.dialog_set_val("server", "127.0.0.1")
        b.set_input_text(self.dialog_field("remote") + " input", "/usr/share")
        b.click(".pf-v6-c-menu ul > li > button")
        self.dialog_set_val("dir", f"{self.mnt_dir}/share")
        self.dialog_apply()
        b.wait_visible("#dialog .pf-v6-c-alert.pf-m-danger")
        self.dialog_cancel()
        self.dialog_wait_close()

        # Add /bar
        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        self.dialog_set_val("server", "127.0.0.1")
        self.dialog_set_val("remote", f"{self.mnt_dir}/bar")
        self.dialog_set_val("dir", f"{self.mnt_dir}s/bar")
        self.dialog_apply()
        self.dialog_wait_close()

        b.wait_visible(self.card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/bar"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}s/bar"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}s/bar") + " .usage-bar[role=progressbar]")
        m.execute(f"test -d {self.mnt_dir}s/bar")

        # Go to details of /bar
        self.click_card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/bar")
        b.wait_text(self.card_desc("NFS mount", "Server"), f"127.0.0.1:{self.mnt_dir}/bar")
        b.wait_text(self.card_desc("NFS mount", "Mount point"), f"{self.mnt_dir}s/bar")

        # Change mount point of /bar
        b.click(self.card_button("NFS mount", "Edit"))
        self.dialog_wait_open()
        self.dialog_set_val("dir", f"{self.mnt_dir}s/barbar")
        self.dialog_apply()
        self.dialog_wait_close()
        self.addCleanup(m.execute, f"umount {self.mnt_dir}s/barbar; rmdir {self.mnt_dir}s/barbar")

        b.wait_text(self.card_desc("NFS mount", "Mount point"), f"{self.mnt_dir}s/barbar")
        m.execute(f"! test -e {self.mnt_dir}s/bar")
        m.execute(f"test -d {self.mnt_dir}s/barbar")
        self.assertEqual(m.execute(f"findmnt -s -n -o OPTIONS {self.mnt_dir}s/barbar").strip(), "defaults")

        # Set options for /barbar
        b.click(self.card_button("NFS mount", "Edit"))

        def wait_checked(field: str) -> None:
            b.wait_visible(self.dialog_field(field) + ":checked")

        def wait_not_checked(field: str) -> None:
            b.wait_visible(self.dialog_field(field) + ":not(:checked)")

        self.dialog_wait_open()
        wait_checked("mount_options.auto")
        wait_not_checked("mount_options.ro")
        self.dialog_set_val("mount_options.auto", val=False)
        self.dialog_set_val("mount_options.ro", val=True)
        self.dialog_set_val("mount_options.extra", "ac")
        self.dialog_apply()
        self.dialog_wait_close()

        self.assertEqual(m.execute(f"findmnt -s -n -o OPTIONS {self.mnt_dir}s/barbar").strip(), "noauto,ro,ac")

        # Should be saved to fstab
        self.assertEqual(m.execute("cat /etc/fstab"), orig_fstab +
                         f"127.0.0.1:{self.mnt_dir}/foo {self.mnt_dir}/test nfs defaults\n" +
                         f"127.0.0.1:{self.mnt_dir}/bar {self.mnt_dir}s/barbar nfs noauto,ro,ac\n")

        # Go to details of /foo
        b.go("#/")
        self.click_card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/foo")
        b.wait_text(self.card_desc("NFS mount", "Server"), f"127.0.0.1:{self.mnt_dir}/foo")
        b.wait_text(self.card_desc("NFS mount", "Mount point"), f"{self.mnt_dir}/test")

        # Unmount and remount /foo
        b.click(self.card_button("NFS mount", "Unmount"))
        b.click(self.card_button("NFS mount", "Mount"))
        b.wait_visible(self.card_button("NFS mount", "Unmount"))

        # Remove /foo
        self.click_card_dropdown("NFS mount", "Remove")
        b.wait_visible(self.card("Storage"))
        b.wait_not_present(self.card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/foo"))
        m.execute(f"! test -e {self.mnt_dir}/test")
        # Should be removed from fstab, too
        self.assertEqual(m.execute("cat /etc/fstab"), orig_fstab +
                         f"127.0.0.1:{self.mnt_dir}/bar {self.mnt_dir}s/barbar nfs noauto,ro,ac\n")

        # Picks up mounts in fstab
        m.execute(f"echo '1.2.3.4:/something {self.mnt_dir}/somewhere nfs defaults 0 0' >> /etc/fstab")
        b.wait_visible(self.card_row("Storage", name="1.2.3.4:/something"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}/somewhere (not mounted)"))

        # Ignores non-FS mounts which look similar
        m.execute(f"echo '2.3.4.5:/marmalade {self.mnt_dir}/dunno rfs defaults 0 0' >> /etc/fstab")
        # But recognizes variants like "nfs4"
        m.execute(f"echo '5.6.7.8:/stuff {self.mnt_dir}/four nfs4 defaults 0 0' >> /etc/fstab")
        b.wait_visible(self.card_row("Storage", name="5.6.7.8:/stuff"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}/four (not mounted)"))
        b.wait_not_present(self.card_row("Storage", name="2.3.4.5:/marmalade"))

    def testNfsListExports(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        m.execute(f"mkdir {self.mnt_dir}/foo {self.mnt_dir}/bar")
        self.write_file("/etc/exports", f"{self.mnt_dir}/foo 127.0.0.0/24(rw)\n{self.mnt_dir}/bar 127.0.0.0/24(rw)\n",
                        post_restore_action="systemctl restart nfs-server")
        m.execute("systemctl restart nfs-server")

        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        self.dialog_set_val("server", "127.0.0.1")

        def wait_for_exports():
            def check():
                choices = self.dialog_combobox_choices("remote")
                return len(choices) == 2 and f"{self.mnt_dir}/foo" in choices and f"{self.mnt_dir}/bar" in choices
            self.retry(check)

        b.click('#dialog [data-field="remote"] button.pf-v6-c-menu-toggle__button')
        wait_for_exports()

    def testNfsMountWithoutDiscovery(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        m.execute(f"mkdir {self.mnt_dir}/srv-foo {self.mnt_dir}/srv-bar")
        self.addCleanup(m.execute, "systemctl restart nfs-server")
        self.write_file("/etc/exports",
                        f"{self.mnt_dir}/srv-foo 127.0.0.0/24(rw)\n{self.mnt_dir}/srv-bar 127.0.0.0/24(rw)\n",
                        post_restore_action="systemctl restart nfs-server")
        m.execute("systemctl restart nfs-server")

        # Break showmount.  Cockpit uses showmount to list exported
        # directories, but that relies on a properly configured
        # firewall etc.  Even if showmount doesn't work, Cockpit
        # should allow people to mount arbitrary directories.
        m.execute("mount -o bind /dev/null /usr/sbin/showmount")
        self.addCleanup(m.execute, "umount /usr/sbin/showmount")

        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        self.dialog_set_val("server", "127.0.0.1")
        # Manually add a remote location to the select
        b.set_input_text("[data-field=remote] input", f"{self.mnt_dir}/srv-foo")
        b.click(".pf-v6-c-menu button.pf-v6-c-menu__item")
        self.dialog_set_val("dir", f"{self.mnt_dir}/foo")
        self.dialog_apply()
        self.dialog_wait_close()
        self.addCleanup(m.execute, f"umount {self.mnt_dir}/foo")

        b.wait_visible(self.card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/srv-foo"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}/foo"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}/foo") + " .usage-bar[role=progressbar]")

    def testNfsBusy(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        m.execute(f"mkdir {self.mnt_dir}/foo {self.mnt_dir}/bar")
        self.addCleanup(m.execute, "systemctl restart nfs-server")
        self.write_file("/etc/exports", f"{self.mnt_dir}/foo 127.0.0.0/24(rw)\n{self.mnt_dir}/bar 127.0.0.0/24(rw)\n",
                        post_restore_action="systemctl restart nfs-server")
        m.execute("systemctl restart nfs-server")

        # Add /foo
        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        self.dialog_set_val("server", "127.0.0.1")
        self.dialog_set_val("remote", f"{self.mnt_dir}/foo")
        self.dialog_set_val("dir", f"{self.mnt_dir}s/foo")
        self.dialog_apply()
        self.dialog_wait_close()

        b.wait_visible(self.card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/foo"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}s/foo"))
        b.wait_visible(self.card_row("Storage", location=f"{self.mnt_dir}s/foo") + " .usage-bar[role=progressbar]")

        # Go to details of /foo
        self.click_card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/foo")
        b.wait_text(self.card_desc("NFS mount", "Server"), f"127.0.0.1:{self.mnt_dir}/foo")
        b.wait_text(self.card_desc("NFS mount", "Mount point"), f"{self.mnt_dir}s/foo")

        sleep_pid = m.spawn(f"cd {self.mnt_dir}s/foo; sleep infinity", "busy")
        b.click(self.card_button("NFS mount", "Edit"))

        self.dialog_wait_open()
        self.dialog_wait_alert("This NFS mount is in use")
        self.dialog_cancel()
        self.dialog_wait_close()

        b.click(self.card_button("NFS mount", "Unmount"))
        self.dialog_wait_open()
        b.wait_in_text("#dialog", str(sleep_pid))
        b.wait_in_text("#dialog", "sleep infinity")
        b.wait_in_text("#dialog", "The listed processes will be forcefully stopped.")
        b.wait_in_text("#dialog", "Stop and unmount")
        b.assert_pixels("#dialog", "unmount", mock={"td[data-label='PID']": "1234",
                                                    "td[data-label='Started']": "a little while ago"})
        self.dialog_apply()
        self.dialog_wait_close()

        b.click(self.card_button("NFS mount", "Mount"))
        b.wait_visible(self.card_button("NFS mount", "Unmount"))

        sleep_pid = m.spawn(f"cd {self.mnt_dir}s/foo; sleep infinity", "busy")
        self.click_card_dropdown("NFS mount", "Remove")
        b.wait_in_text("#dialog", str(sleep_pid))
        b.wait_in_text("#dialog", "sleep infinity")
        b.wait_in_text("#dialog", "The listed processes will be forcefully stopped.")
        b.wait_in_text("#dialog", "Stop and remove")
        self.dialog_apply()
        self.dialog_wait_close()

        # We are back on the Overview with nothing there
        b.wait_visible(self.card("Storage"))
        b.wait_not_present(self.card_row("Storage", name=f"127.0.0.1:{self.mnt_dir}/foo"))
        b.wait_not_present(self.card_row("Storage", location=f"{self.mnt_dir}s/foo"))


# Re-use allowed journal messages from StorageCase
@testlib.skipOstree("No udisks/cockpit-storaged on OSTree images")
@testlib.nondestructive
class TestStoragePackagesNFS(packagelib.PackageCase, storagelib.StorageCase):

    def testNfsMissingPackages(self):
        m = self.machine
        b = self.browser

        m.execute("systemctl restart nfs-server")

        # Override configuration so that we don't have to remove the
        # real package.

        self.machine.write("/etc/cockpit/storaged.override.json",
                           """{ "config": { "nfs_client_package": "fake-nfs-utils" } }""")

        if m.execute("if test -e /sbin/mount.nfs; then echo yes; fi").strip():
            m.execute("mv /sbin/mount.nfs /sbin/mount.nfs.off")
            self.addCleanup(m.execute, "mv /sbin/mount.nfs.off /sbin/mount.nfs")

        self.login_and_go("/storage")

        # The fake-nfs-utils package is not available yet

        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        b.wait_in_text("#dialog", "fake-nfs-utils is not available from any repository.")
        self.dialog_cancel()
        self.dialog_wait_close()

        # Now make the package available

        self.createPackage("dummy", "1.0", "1")
        self.createPackage("fake-nfs-utils", "1.0", "1", depends="fake-libnfs")
        self.createPackage("fake-libnfs", "1.0", "1")
        self.enableRepo()

        # HACK
        #
        # The first simulated install seems to silently not report
        # anything on the Debian test images, for unknown reasons.  So
        # we install a dummy package to warm up all parts of the
        # machinery and distribute the fluids evenly.
        # https://github.com/PackageKit/PackageKit/issues/893
        #
        if "debian" in m.image or "ubuntu" in m.image:
            m.execute("pkcon refresh; pkcon install -y dummy")

        # HACK
        # Packagekit on Arch Linux does not deal well with detecting new repositories
        if m.image == "arch":
            m.execute("pkcon refresh force")

        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        b.wait_in_text("#dialog", "fake-nfs-utils will be installed")
        b.wait_in_text("#dialog", "fake-libnfs")
        self.dialog_apply()
        self.dialog_set_val("server", "127.0.0.1")
        self.dialog_cancel()
        self.dialog_wait_close()

        # Now we should go straight to the main dialog
        self.click_dropdown(self.card_header("Storage"), "New NFS mount")
        self.dialog_wait_open()
        self.dialog_set_val("server", "127.0.0.1")
        self.dialog_cancel()
        self.dialog_wait_close()


if __name__ == '__main__':
    testlib.test_main()
